package com.czxy

import java.util.Properties

import com.czxy.bean.{HBaseMeta, TagRule}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object BlckModel {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("black").master("local[*]").getOrCreate()

    import spark.implicits._
    //读取mysql中的数据
    val prop = new Properties()
    prop.setProperty("user", "root")
    prop.setProperty("password", "123456")

    val MysqlDataf: DataFrame = spark.read.jdbc("jdbc:mysql://bd001:3306/tags_new", "tbl_basic_tag", prop)

    //获取4级标签的数据
    val ruleBlack: Dataset[Row] = MysqlDataf.select("id", "rule").where("id=73")

    //解析封装成HBaseMeta
    val RuleMap = ruleBlack.map(row => {
      val rule = row.getAs("rule").toString
      rule.split("##").map(arrs => {
        val strings = arrs.split("=")
        (strings(0), strings(1))
      })
    }).collectAsList().get(0).toMap

    val hBaseMeta = toHbaseMeta(RuleMap)

    //TODO 获取黑名单五级标签对应的规则
    val blackFileTags = MysqlDataf.select("id", "rule").where("pid=73")

    //将util.List转换成list   需要隐式转换
    import scala.collection.JavaConverters._
    val tagrule = blackFileTags.map(row => {
      val id = row.getAs("id").toString
      val rule = row.getAs("rule").toString
      TagRule(id.toInt, rule)
    }).collectAsList().asScala.toList

    //链接hbase
    val hbaseData = spark.read.format("com.czxy.tools.HBaseDataSource")
      .option(HBaseMeta.FAMILY, hBaseMeta.family)
      .option(HBaseMeta.HBASETABLE, hBaseMeta.hbaseTable)
      .option(HBaseMeta.INTYPE, hBaseMeta.inType)
      .option(HBaseMeta.ZKHOSTS, hBaseMeta.zkHosts)
      .option(HBaseMeta.ZKPORT, hBaseMeta.zkPort)
      .option(HBaseMeta.SELECTFIELDS, hBaseMeta.selectFields)
      .option(HBaseMeta.ROWKEY, hBaseMeta.rowKey)
      .load()

    //引入sparkSQL的内置函数
    import org.apache.spark.sql.functions._
    val blackId = udf((black: String) => {
      var id: Int = 0
      for (rules <- tagrule) {
        if (black == "true" && rules.rule == "0") {
          id = rules.id
        } else if (black == "false" && rules.rule == "1"){
          id = rules.id
        }
      }
      id
    })

    val data: DataFrame = hbaseData.select('id as ("id"), blackId('is_blackList).as("is_blackList"))

    data.show(10)

  }

  /**
    * 封装metahbase
    */

  def toHbaseMeta(RuleMap: Map[String, String]) = {
    HBaseMeta(RuleMap.getOrElse("inType", ""),
      RuleMap.getOrElse("zkHosts", ""),
      RuleMap.getOrElse("zkPort", ""),
      RuleMap.getOrElse("hbaseTable", ""),
      RuleMap.getOrElse("family", ""),
      RuleMap.getOrElse("selectFields", ""),
      RuleMap.getOrElse("rowKey", "")
    )
  }
}
